home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
The World of Computer Software
/
The World of Computer Software.iso
/
proxy11.zip
/
DATAFLOW.TXT
< prev
next >
Wrap
Text File
|
1992-12-30
|
5KB
|
93 lines
Proxy v1.2 (tasking) provides a tasking facility using the message
passing paradigm. The basic constructs are tasks and channels.
A task is defined by prefixing the 'task' modifier to a function
definition. Channels are created calling the built-in function of
no arguments: channel(). Other constructs are the start operator
which, when applied to an argument list, activates a task, the
skip operator which terminates a task, (normally, a task only
terminates when it reaches the end of the task body), an input
operator ? which is applied to a channel and an input variable, and
an output operator ! which is applied to a channel and an
expression. The syntax of the input/output operators is:
channel-id ? identifier The current task is blocked until
a message is available in channel-id.
Then the message is stored in
identifier.
channel-id ! expression Expression is evaluated and the
value is transmitted in channel-id.
The use of these operators is shown in a simple dataflow
diagram where tasks correspond to process nodes, and channels
correspond to data flows.
___________ _________ __________
| | trans| |group | |
| Source |______| Group |______| Sink |
| | | | | |
|_________| |_______| |________|
There are three process nodes, Source, Group and Sink. Source produces
a stream of records or transactions, where each record consists of
a partno (which may be considered to be an integer for simplicity)
and a quantity. Quantities may be positive or negative, and may be
thought of as representing receipts or issues into or out of a
warehouse. Source obtains its data from a file, where each line
of the file consists of the partno and quantity. Further, the lines
of the file are considered to be sorted on the partno. The purpose
of Group is to collect all the records of the same partno into an
aggregate data structure and pass these aggregates to Sink. The
purpose of Sink is to produce a report giving for each partno the
net issue or receipt of that partno. The process nodes are represented
by tasks and the data flows by channels. We show below the Proxy program
(with appropriate comments), to execute the dataflow diagram.
ch1=channel(); // channels ch1 and ch2 are declared
ch2=channel();
eos="eos"; // the end of stream indicator is defined
// reduce adds algebraicly all the integers in a sequence.
reduce(x) {if(x==[]) return 0;else return hd x+reduce(tl x);};
struct trans {partno,qty;}; // trans is defined with components partno
// and quantity
task source(out) {p=open("srce.in","r");readln(p,partno,qty);
while(!eof) {
out!new trans(partno,qty); // creates a transaction record
readln(p,partno,qty); // from a file and sends it
} // on the output channel
out!eos; // at end of file, sends
}; // the end of stream indicator
task group(inp,out;x,y,g,p) {g=[];inp?x;p=x.partno;y=p;
while (x!=eos) {if(y==p) // concatenates a record to the
{g=g conc [x];inp?x; // sequence as long as partno
if(x==eos) // is the same, if input is eos
{out!g;out!eos;skip;} // outputs the sequence, eos
p=x.partno;} // and terminates the task
else {out!g;g=[];y=p;} // if partno is different,
} // outputs the sequence,
}; // initializes new sequence
// and partno
task sink(inp;x,y) {inp?x; // inputs a sequence
while(x!=eos)
{y=[z.qty:z<-x]; // y is a sequence of qty's
z=hd x; // from input, z is the first record
writeln("partno",z.partno,"net qty",reduce(y));
inp?x;}
};
task main() {start source(ch1);start group(ch1,ch2);start sink(ch2);};
The main task starts the tasks source, group and sink by applying them
to channel variables, where a particular channel variable serves both as
the output of one task and the input to another. Execution is initiated
by invoking the main task:
main();